/*

    flink设置的类加载规则中，
        如果有两个同名的类，优先加载你自己项目中的类。
 */

package org.apache.flink.api.common.eventtime;

import org.apache.flink.annotation.Public;

import java.time.Duration;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * A WatermarkGenerator for situations where records are out of order, but you can place an upper
 * bound on how far the events are out of order. An out-of-order bound B means that once an event
 * with timestamp T was encountered, no events older than {@code T - B} will follow any more.
 *
 * <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is
 * the periodic interval length, plus the out-of-orderness bound.
 */
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    /*
        每收到一条数据，就调用一次
        T event: 数据
        long eventTimestamp： 数据中的时间属性

        断点式水印

     */
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        //System.err.println("收到了:"+event+"，其中的时间属性是:"+eventTimestamp);
        //保证当前算子收到的最大时间戳是单调递增，进而保证水印是单调递增。时间不会回退。
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);

        Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
        //System.err.println("向下游发送了水印:"+watermark.getTimestamp());
        output.emitWatermark(watermark);
    }

    // 周期性发送水印。周期默认200ms
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
      /*  Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
        System.err.println(Thread.currentThread().getName()+"向下游发送了水印:"+watermark.getTimestamp());
        output.emitWatermark(watermark);*/
    }
}
